package com.example.window;

import com.example.bean.WaterSenSorFunction;
import com.example.bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

/**
 * Created with IntelliJ IDEA.
 * ClassName: WindowRdeuceDemo
 * Package: com.example.window
 * Description:
 * User: fzykd
 *
 * @Author: LQH
 * Date: 2023-07-19
 * Time: 17:41
 */

public class WindowReduceDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> data = env.socketTextStream("hadoop102", 7777)
                .map(new WaterSenSorFunction());
        KeyedStream<WaterSensor, String> sensorKS = data.keyBy(value -> value.getId());

        //1.窗口 增量聚合函数 规约聚合 Reduce
        //1.窗口分配器
        //滚动窗口
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS =
                sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        //2.窗口函数 规约函数Reduce 这个窗口的Reduce 不是聚合算子的Reduce
        //最后调用窗口规约聚合函数之后 返回值也是一条普通的流
        SingleOutputStreamOperator<WaterSensor> reduce = sensorWS.reduce(new ReduceFunction<WaterSensor>() {
            //经过keyBy 相同的key才会进来 而且第一条数据不会调用Reduce方法
            //增量聚合 来一条数据不会输出 但是会计算 到达了窗口的设定值就输出
            //窗口触发的时候 才会输出窗口最后的结果
            @Override
            public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                System.out.println("Reduce value1=" + value1 + "value2=" + value2);
                return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
            }
        });

        reduce.print();
        env.execute();
    }

}
